18.9 其他
本节介绍与任务执行有关的几种暂停操作。
Gosched
可被用户调用的runtime.Gosched将当前G任务暂停,重新放回全局队列,让出当前M去执行其他任务。我们无须对G做唤醒操作,因为它总归会被某个M重新拿到,并从“断点”恢复。
proc.go
func Gosched() { mcall(gosched_m) }
proc1.go
func gosched_m(gp *g) {
goschedImpl(gp)
}
func goschedImpl(gp *g) {
// 重置属性
casgstatus(gp, _Grunning, _Grunnable)
dropg()
// 将当前 G 放回全局队列
globrunqput(gp)
// 重新调度执行其他任务
schedule()
}
func dropg() {
g := getg()
if g.m.lockedg == nil {
g.m.curg.m = nil
g.m.curg = nil
}
}
实现“断点恢复”的关键由mcall实现,它将当前执行状态,包括SP、PC寄存器等值保存到G.sched区域。
asm_amd64.s
TEXT runtime•mcall(SB), NOSPLIT, $0-8 MOVQ fn+0(FP), DI
get_tls(CX)
MOVQ g(CX), AX // save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
MOVQ BP, (g_sched+gobuf_bp)(AX)
// switch to m->g0 & its stack, call fn
...
当execute/gogo再次执行该任务时,自然可从中恢复状态。反正执行栈是G自带的,不用担心执行数据丢失。
gopark
与Gosched最大的区别在于,gopark并没将G放回待运行队列。也就是说,必须主动恢复,否则该任务会遗失。
proc.go
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, …) { mp := acquirem() gp := mp.curg mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) mcall(park_m) }
可看到gopark同样是由mcall保存执行状态,还有个unlockf作为暂停判断条件。
proc1.go
func park_m(gp *g) { g := getg() // 重置属性 casgstatus(gp, _Grunning, _Gwaiting) dropg() // 执行解锁函数。如果返回 false,则恢复执行 if g.m.waitunlockf != nil { fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&g.m.waitunlockf)) ok := fn(gp, g.m.waitlock) g.m.waitunlockf = nil g.m.waitlock = nil if !ok { casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns } } // 调度执行其他任务 schedule() }
与之配套,goready用于恢复执行,G被放回优先级最高的P.runnext。
proc.go
func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip) }) }
proc1.go
func ready(gp *g, traceskip int) { // 修正状态,重新放回本地 runnext casgstatus(gp, _Gwaiting, _Grunnable) runqput(g.m.p.ptr(), gp, true) }
notesleep
相比gosched、gopark,反应更敏捷的notesleep既不让出M,也就不会让G重回任务队列。它直接让线程休眠直到被唤醒,更适合stopm、gcMark这类近似自旋的场景。
在Linux、DragonFly、FreeBSD平台,notesleep是基于Futex的高性能实现。
Futex通常称作“快速用户区互斥”,是一种在用户空间实现的锁(互斥)机制。多执行单位(进程或线程)通过共享同一块内存(整数)来实现等待和唤醒操作。因为Futex只在操作结果不一致时才进入内核仲裁,所以有非常高的执行效率。
更多内容请参考man 2 futex。
runtime2.go
type m struct { park note } type note struct { // Futex-based impl treats it as uint32 key, while sema-based impl as M* waitm key uintptr }
围绕note.key值来处理休眠和唤醒操作。
lock_futex.go
func notesleep(n *note) { gp := getg() for atomicload(key32(&n.key)) == 0 { gp.m.blocked = true futexsleep(key32(&n.key), 0, -1) // 检查 n.key == 0,休眠 gp.m.blocked = false // 唤醒后 n.key == 1 } } func notewakeup(n *note) { // 如果 old != 0,表示已经执行过唤醒操作 old := xchg(key32(&n.key), 1) if old != 0 { throw(“notewakeup - double wakeup”) } // 唤醒后 n.key == 1 futexwakeup(key32(&n.key), 1) } // 重置休眠条件 func noteclear(n *note) { n.key = 0 }
os1_linux.go
func futexsleep(addr *uint32, val uint32, ns int64) { var ts timespec // 不超时 if ns < 0 { futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, nil, nil, 0) return } ts.set_sec(ns / 1000000000) ts.set_nsec(int32(ns % 1000000000)) // 如果 futex_value == val,则进入休眠等待状态,直到 FUTEX_WAKE 或超时 futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, unsafe.Pointer(&ts), nil, 0) } func futexwakeup(addr *uint32, cnt uint32) { // 唤醒 cnt 个等待单位,这会设置 futex_value = 1 ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE, cnt, nil, nil, 0) }
其他不支持Futex的Darwin、Windows等平台,可参阅lock_sema.go基于semaphore的实现。
Goexit
用户可调用runtime.Goexit立即终止G任务,不管当前处于调用堆栈的哪个层次。在终止前,它确保所有G.defer被执行。
panic.go
func Goexit() { gp := getg() for { d := gp._defer … freedefer(d) } goexit1() }
比较有趣的是在main goroutine里执行Goexit,它会等待其他goroutine结束后才会崩溃。
test.go
package main import ( “fmt” “runtime” “time” ) func main() { for i := 0; i < 3; i++ { go func(n int) { time.Sleep(time.Second * time.Duration(n+1)) fmt.Printf(“G%d end.\n”, n) }(i) } println(“Goexit.”) runtime.Goexit() println(“never execute.“) }
$ go build -o test test.go && ./test Goexit. G0 end. G1 end. G2 end. fatal error: no goroutines (main called runtime.Goexit) - deadlock! runtime stack: runtime.throw(0x52cdc0, 0x36) /usr/local/go/src/runtime/panic.go:527 +0x90 runtime.checkdead() /usr/local/go/src/runtime/proc1.go:2933 +0x1fb runtime.mput(0xc82002a900) /usr/local/go/src/runtime/proc1.go:3268 +0x46 runtime.stopm() /usr/local/go/src/runtime/proc1.go:1126 +0xdd runtime.findrunnable(0xc82001c000, 0x0) /usr/local/go/src/runtime/proc1.go:1530 +0x69e runtime.schedule() /usr/local/go/src/runtime/proc1.go:1639 +0x267 runtime.goexit0(0xc820001380) /usr/local/go/src/runtime/proc1.go:1765 +0x1a2 runtime.mcall(0x0) /usr/local/go/src/runtime/asm_amd64.s:204 +0x5b
stopTheWorld
本章的最后,我们看看导致整个进程用户逻辑停止的STW是如何实现的。
用户逻辑必须暂停在一个安全点上,否则会引发很多意外问题。因此,stopTheWorld同样是通过“通知”机制,让G主动停止。比如,设置“gcwaiting=1”让调度函数schedule主动休眠M;向所有正在运行的G任务发出抢占调度,使其暂停。
proc1.go
func stopTheWorld(reason string) { semacquire(&worldsema, false) getg().m.preemptoff = reason systemstack(stopTheWorldWithSema) } func stopTheWorldWithSema() { g := getg() sched.stopwait = gomaxprocs // 设置停止标志,让 schedule 之类的调用主动休眠 M atomicstore(&sched.gcwaiting, 1) // 向所有正在运行的 G 发出抢占调度 preemptall() // 暂停当前 P g.m.p.ptr().status = _Pgcstop sched.stopwait— // 尝试暂停所有 syscall 状态的 P for i := 0; i < int(gomaxprocs); i++ { p := allp[i] s := p.status if s == _Psyscall && cas(&p.status, s, _Pgcstop) { p.syscalltick++ sched.stopwait— } } // 处理空闲 P for { p := pidleget() if p == nil { break } p.status = _Pgcstop sched.stopwait— } wait := sched.stopwait > 0 // 等待 if wait { for { // 暂停 100us 后,重新发出抢占调度 // handoffp、gcstopm、entersyscall_gcwait 等操作都会 sched.stopwait—, // 如果 stopwait == 0 则尝试唤醒 stopnote // 若唤醒成功,跳出循环;失败,则重新发出抢占调度,再次等待 if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } preemptall() } } // 检查所有 P 状态 for i := 0; i < int(gomaxprocs); i++ { p := allp[i] if p.status != _Pgcstop { throw(“stopTheWorld: not stopped”) } } } // 向所有 P 发出抢占调度 func preemptall() bool { res := false for i := int32(0); i < gomaxprocs; i++ { p := allp[i] if p == nil || p.status != _Prunning { continue } if preemptone(p) { res = true } } return res }
总体上看,stopTheWorld还是很平和的一种手段,会循环等待目标任务进入一个安全点后主动暂停。而startTheWorld就更简单,毕竟是从冻结状态开始,无非是唤醒相关P/M继续执行任务。
proc1.go
func startTheWorld() { systemstack(startTheWorldWithSema) semrelease(&worldsema) getg().m.preemptoff = "" } func startTheWorldWithSema() { g := getg() // 检查是否需要 procresize p1 := procresize(procs) // 解除停止状态 sched.gcwaiting = 0 // 唤醒 sysmon if sched.sysmonwait != 0 { sched.sysmonwait = 0 notewakeup(&sched.sysmonnote) } // 循环有任务的 P 链表,让它们继续工作 for p1 != nil { p := p1 p1 = p1.link.ptr() if p.m != 0 { mp := p.m.ptr() p.m = 0 mp.nextp.set(p) notewakeup(&mp.park) } else { // Start M to run P. Do not start another M below. newm(nil, p) add = false } } // 让闲置的家伙都起来工作! if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { wakep() } // 重置抢占标志 if g.m.locks == 0 && g.preempt { g.stackguard0 = stackPreempt } }